package fun.easycode.datastream;

import fun.easycode.jointblock.util.JacksonUtil;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

/**
 * 数据全量启动消息生产者
 * @author xuzhen97
 */
public class DataCompleteStartProducer {
    private final DefaultMQProducer producer;
    private final DataStreamProperties properties;

    public DataCompleteStartProducer(DataStreamProperties properties){
        this.properties = properties;
        this.producer = createProducer();
    }

    /**
     * 启动生产者
     * @throws MQClientException 启动异常
     */
    public void start() throws MQClientException {
        producer.start();
    }

    /**
     * 发送消息
     * @param taskId 任务id
     */
    public void produce(String taskId){
        DataCompleteStartEntry entry = new DataCompleteStartEntry();
        entry.setTaskId(taskId);
        try {
            producer.send(new Message(
                    properties.getRocketMq().getDefaultCompleteTopic(),
                    JacksonUtil.toJson(entry).getBytes()
            ));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 创建生产者
     * @return 生产者
     */
    private DefaultMQProducer createProducer(){
        // 一个transform对应一个生产者, 所以生产者的名称就是transform的名称
        DefaultMQProducer producer = new DefaultMQProducer("DataCompleteStartProducer");
        producer.setNamesrvAddr(properties.getRocketMq().getNameSrvAddr());
        return producer;
    }
}
